package process;

import bean.SensorReading;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

/**
 * @Description: TODO QQ1667847363
 * @author: xiao kun tai
 * @date:2021/11/7 11:27
 */
public class Process2_ApplicationCase {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<String> inputStream = env.socketTextStream("192.168.88.106", 7777);

        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });


        dataStream.keyBy("id")
                .process(new TempConsincreWarning(10))
                .print();

        env.execute();


    }


    /**
     * 实现自定义处理函数，检测一段时间内温度连续上升，输出报警
     */
    public static class TempConsincreWarning extends KeyedProcessFunction<Tuple, SensorReading, String> {
        /**
         * 定义当前统计时间间隔
         */
        private Integer interval;

        public TempConsincreWarning(Integer interval) {
            this.interval = interval;
        }

        //定义状态，保存上一次的温度值，定时器时间戳
        private ValueState<Double> lastTempState;
        private ValueState<Long> timerTsState;

        @Override
        public void open(Configuration parameters) throws Exception {
            lastTempState = getRuntimeContext().getState(
                    new ValueStateDescriptor<Double>("last-temp", Double.class, Double.MIN_VALUE));
            timerTsState = getRuntimeContext().getState(
                    new ValueStateDescriptor<Long>("", Long.class));
        }

        @Override
        public void processElement(SensorReading sensorReading, Context context, Collector<String> collector) throws Exception {
            //取出状态
            Double lastTemp = lastTempState.value();
            Long timerTs = timerTsState.value();
            //如果温度上升且没有定时器，注册10秒的定时器，开始等待
            if (sensorReading.getTemperature() > lastTemp && timerTs == null) {
                //计算定时器时间戳
                Long ts = context.timerService().currentProcessingTime() + interval * 1000L;
                context.timerService().registerProcessingTimeTimer(ts);
                timerTsState.update(ts);
            }
            //如果温度下降
            else if (sensorReading.getTemperature() < lastTemp && timerTs != null) {
                context.timerService().deleteProcessingTimeTimer(timerTs);
                timerTsState.clear();
            }

            //更新温度状态
            lastTempState.update(sensorReading.getTemperature());

        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            //只要触发输出报警信息
            out.collect("传感器 " + ctx.getCurrentKey().getField(0) +
                    " 温度值连续 " + interval + " s上升");
            timerTsState.clear();
        }

        @Override
        public void close() throws Exception {
            lastTempState.clear();
        }
    }
}
